Apache Spark for Everyone - PySpark + Python

Markdown blocks communicate text, images + whatever other useful HTML bits you want to share.

Like TODO lists:

  • get bikes data set
  • import csv
  • do some things with pyspark
  • do some thigns with python
  • show a python vis?
  • save out file

And code bits:

from pyspark.sql import SQLContext

And where you can check on your local Spark cluster

Great Markdown cheatsheet on github here


In [1]:
# set your working directory if you want less pathy things later
WORK_DIR = '/Users/amcasari/repos/wwconnect-2016-spark4everyone/'

In [2]:
# create an RDD from bikes data
# sc is an existing SparkContext (initialized when PySpark starts)

bikes = sc.textFile(WORK_DIR + "data/bikes/p*")
bikes.count()


Out[2]:
17380

In [3]:
# import SQLContext 
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

In [4]:
# since we are familiar with pandas dataframes, let's convert the RDD to a Spark DataFrame
# we'll try to infer the schema from the files

bikes_df = sqlContext.createDataFrame(bikes)


---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-4-ff71d9ce6d84> in <module>()
      2 # we'll try to infer the schema from the files
      3 
----> 4 bikes_df = sqlContext.createDataFrame(bikes)

/Users/amcasari/sandbox/spark/python/pyspark/sql/context.pyc in createDataFrame(self, data, schema, samplingRatio)
    421 
    422         if isinstance(data, RDD):
--> 423             rdd, schema = self._createFromRDD(data, schema, samplingRatio)
    424         else:
    425             rdd, schema = self._createFromLocal(data, schema)

/Users/amcasari/sandbox/spark/python/pyspark/sql/context.pyc in _createFromRDD(self, rdd, schema, samplingRatio)
    308         """
    309         if schema is None or isinstance(schema, (list, tuple)):
--> 310             struct = self._inferSchema(rdd, samplingRatio)
    311             converter = _create_converter(struct)
    312             rdd = rdd.map(converter)

/Users/amcasari/sandbox/spark/python/pyspark/sql/context.pyc in _inferSchema(self, rdd, samplingRatio)
    261 
    262         if samplingRatio is None:
--> 263             schema = _infer_schema(first)
    264             if _has_nulltype(schema):
    265                 for row in rdd.take(100)[1:]:

/Users/amcasari/sandbox/spark/python/pyspark/sql/types.pyc in _infer_schema(row)
    829 
    830     else:
--> 831         raise TypeError("Can not infer schema for type: %s" % type(row))
    832 
    833     fields = [StructField(k, _infer_type(v), True) for k, v in items]

TypeError: Can not infer schema for type: <type 'unicode'>

In [5]:
# whoops a daisy, let's remove the header, split out the Rows + we can programmatically specify the schema

names = bikes.first().replace('"','')
names


Out[5]:
u'instant,dteday,season,yr,mnth,hr,holiday,weekday,workingday,weathersit,temp,atemp,hum,windspeed,casual,registered,cnt'

In [6]:
# remove the header using subtract
bikesHeader = bikes.filter(lambda x: "instant" in x)
bikesFiltered = bikes.subtract(bikesHeader)
bikesFiltered.count()


Out[6]:
17379

In [7]:
# programmatically specify the schema using a StructField
from pyspark.sql.types import *

fields = [StructField(field_name, StringType(), False) for field_name in names.split(',')]
fields


Out[7]:
[StructField(instant,StringType,false),
 StructField(dteday,StringType,false),
 StructField(season,StringType,false),
 StructField(yr,StringType,false),
 StructField(mnth,StringType,false),
 StructField(hr,StringType,false),
 StructField(holiday,StringType,false),
 StructField(weekday,StringType,false),
 StructField(workingday,StringType,false),
 StructField(weathersit,StringType,false),
 StructField(temp,StringType,false),
 StructField(atemp,StringType,false),
 StructField(hum,StringType,false),
 StructField(windspeed,StringType,false),
 StructField(casual,StringType,false),
 StructField(registered,StringType,false),
 StructField(cnt,StringType,false)]

In [8]:
schema = StructType(fields)
schema


Out[8]:
StructType(List(StructField(instant,StringType,false),StructField(dteday,StringType,false),StructField(season,StringType,false),StructField(yr,StringType,false),StructField(mnth,StringType,false),StructField(hr,StringType,false),StructField(holiday,StringType,false),StructField(weekday,StringType,false),StructField(workingday,StringType,false),StructField(weathersit,StringType,false),StructField(temp,StringType,false),StructField(atemp,StringType,false),StructField(hum,StringType,false),StructField(windspeed,StringType,false),StructField(casual,StringType,false),StructField(registered,StringType,false),StructField(cnt,StringType,false)))

In [9]:
# convert each line in the csv to a tuple
parts = bikesFiltered.map(lambda l: l.split(","))
bikesSplit = parts.map(lambda p: (p[0], p[1], p[2], p[3], p[4], p[5], p[6], p[7], p[8], p[9], p[10],
                                 p[11], p[12], p[13], p[14], p[15], p[16]))

In [10]:
# Apply the schema to the RDD.
bikes_df = sqlContext.createDataFrame(bikesSplit, schema)

In [11]:
bikes_df.show()


+-------+----------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
|instant|    dteday|season| yr|mnth| hr|holiday|weekday|workingday|weathersit|temp| atemp| hum|windspeed|casual|registered|cnt|
+-------+----------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
|  15086|2012-09-25|     4|  1|   9| 18|      0|      2|         1|         1|0.64|0.6212|0.41|   0.2239|    64|       758|822|
|   1060|2011-02-16|     1|  0|   2| 21|      0|      3|         1|         1|0.36|0.3485|0.46|    0.194|     5|        87| 92|
|   8138|2011-12-10|     4|  0|  12| 17|      0|      6|         0|         1|0.28|0.2576|0.36|   0.3284|    34|       151|185|
|  15068|2012-09-25|     4|  1|   9|  0|      0|      2|         1|         1|0.46|0.4545|0.67|   0.1642|     8|        56| 64|
|   6631|2011-10-08|     4|  0|  10| 20|      0|      6|         0|         1|0.52|   0.5|0.77|   0.1045|    78|       124|202|
|   4631|2011-07-16|     3|  0|   7| 20|      0|      6|         0|         1|0.72|0.6667|0.51|   0.2239|   108|       188|296|
|   2658|2011-04-25|     2|  0|   4| 15|      0|      1|         1|         1|0.74|0.6667|0.51|   0.2239|    50|       125|175|
|  14937|2012-09-19|     3|  1|   9| 13|      0|      3|         1|         1| 0.6|0.6212| 0.4|   0.2537|    55|       234|289|
|  16174|2012-11-11|     4|  1|  11| 15|      0|      0|         0|         1|0.56|0.5303|0.37|   0.2239|   304|       420|724|
|   2036|2011-03-30|     2|  0|   3| 16|      0|      3|         1|         3|0.28|0.2727|0.87|   0.2537|     0|        36| 36|
|    721|2011-02-02|     1|  0|   2|  9|      0|      3|         1|         2|0.24|0.2576|0.93|   0.0896|     4|       119|123|
|   1006|2011-02-14|     1|  0|   2| 13|      0|      1|         1|         1|0.58|0.5455|0.19|   0.3881|    27|        93|120|
|  14599|2012-09-05|     3|  1|   9| 11|      0|      3|         1|         2|0.78|0.7424|0.62|   0.1642|    61|       156|217|
|    852|2011-02-07|     1|  0|   2| 22|      0|      1|         1|         1|0.28| 0.303|0.81|   0.0896|     3|        34| 37|
|    724|2011-02-02|     1|  0|   2| 12|      0|      3|         1|         2|0.24|0.2273|0.93|   0.2239|     3|        61| 64|
|  17280|2012-12-27|     1|  1|  12| 20|      0|      4|         1|         1|0.24|0.2424| 0.6|   0.1642|    12|        79| 91|
|   7267|2011-11-04|     4|  0|  11|  9|      0|      5|         1|         2|0.42|0.4242|0.71|   0.4627|    15|       239|254|
|   1400|2011-03-03|     1|  0|   3| 14|      0|      4|         1|         1|0.24|0.2576|0.21|   0.1045|    18|        60| 78|
|  17097|2012-12-20|     4|  1|  12|  3|      0|      4|         1|         2| 0.3|0.3182| 0.7|   0.0896|     1|         3|  4|
|  16246|2012-11-14|     4|  1|  11| 15|      0|      3|         1|         1|0.36|0.3333|0.43|   0.2836|    32|       228|260|
+-------+----------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
only showing top 20 rows


In [12]:
bikes_df.printSchema()


root
 |-- instant: string (nullable = false)
 |-- dteday: string (nullable = false)
 |-- season: string (nullable = false)
 |-- yr: string (nullable = false)
 |-- mnth: string (nullable = false)
 |-- hr: string (nullable = false)
 |-- holiday: string (nullable = false)
 |-- weekday: string (nullable = false)
 |-- workingday: string (nullable = false)
 |-- weathersit: string (nullable = false)
 |-- temp: string (nullable = false)
 |-- atemp: string (nullable = false)
 |-- hum: string (nullable = false)
 |-- windspeed: string (nullable = false)
 |-- casual: string (nullable = false)
 |-- registered: string (nullable = false)
 |-- cnt: string (nullable = false)


In [13]:
# now we can look for trends + data quality questions...

# total # of rows in the DataFrame
num_rows = bikes_df.count()

# number of distinct rows in the DataFrame
num_distinct = bikes_df.distinct().count()

# and we can start to see where pySpark returning python objects can be used locally
print "count() returns a python object of type " + str(type(num_rows))
print "number of duplicate rows in the DataFrame: " + str(num_rows - num_distinct)


count() returns a python object of type <type 'int'>
number of duplicate rows in the DataFrame: 0

In [14]:
# check out some more df methods
bikes_df.groupBy('holiday').count().show()


+-------+-----+
|holiday|count|
+-------+-----+
|      0|16879|
|      1|  500|
+-------+-----+


In [15]:
# let's looks at trips in July
july_trips = bikes_df.filter(bikes_df['mnth'] == 7)

# since we'll be working over the DAG quite a bit, let's persist the RDD in memory
july_trips.persist()


Out[15]:
DataFrame[instant: string, dteday: string, season: string, yr: string, mnth: string, hr: string, holiday: string, weekday: string, workingday: string, weathersit: string, temp: string, atemp: string, hum: string, windspeed: string, casual: string, registered: string, cnt: string]

In [16]:
july_trips.count()


Out[16]:
1488

In [17]:
july_trips.show()


+-------+----------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
|instant|    dteday|season| yr|mnth| hr|holiday|weekday|workingday|weathersit|temp| atemp| hum|windspeed|casual|registered|cnt|
+-------+----------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
|   4631|2011-07-16|     3|  0|   7| 20|      0|      6|         0|         1|0.72|0.6667|0.51|   0.2239|   108|       188|296|
|  13677|2012-07-29|     3|  1|   7|  1|      0|      0|         0|         1|0.66|0.6061|0.83|   0.1045|    49|       109|158|
|  13060|2012-07-03|     3|  1|   7|  8|      0|      2|         1|         1|0.74|0.6818|0.62|   0.0896|    42|       604|646|
|  13219|2012-07-09|     3|  1|   7| 23|      0|      1|         1|         2| 0.7|0.6515|0.65|   0.1045|    22|       109|131|
|  13233|2012-07-10|     3|  1|   7| 13|      0|      2|         1|         1|0.82|0.7273|0.38|    0.194|    77|       203|280|
|   4368|2011-07-05|     3|  0|   7| 21|      0|      2|         1|         1|0.78|0.7424|0.62|   0.2537|    77|       168|245|
|  13481|2012-07-20|     3|  1|   7| 21|      0|      5|         1|         3|0.62|0.5758|0.83|   0.3284|    14|       108|122|
|   4662|2011-07-18|     3|  0|   7|  3|      0|      1|         1|         1|0.66|0.6061|0.78|    0.194|     3|         4|  7|
|   4456|2011-07-09|     3|  0|   7| 13|      0|      6|         0|         1|0.82|0.7424|0.41|   0.2239|   167|       249|416|
|  13591|2012-07-25|     3|  1|   7| 11|      0|      3|         1|         1|0.76|0.6667|0.33|        0|    79|       202|281|
|  13444|2012-07-19|     3|  1|   7|  8|      0|      4|         1|         1|0.76|0.7121|0.58|   0.2537|    32|       625|657|
|  13355|2012-07-15|     3|  1|   7| 15|      0|      0|         0|         1|0.86| 0.803|0.47|   0.1343|   182|       307|489|
|  13462|2012-07-20|     3|  1|   7|  2|      0|      5|         1|         2|0.66|0.5909|0.89|   0.0896|     5|        14| 19|
|  13500|2012-07-21|     3|  1|   7| 16|      0|      6|         0|         3| 0.6|0.5455|0.88|    0.194|   130|       196|326|
|  13282|2012-07-12|     3|  1|   7| 14|      0|      4|         1|         1|0.78|0.6818|0.33|   0.2239|    42|       167|209|
|  13077|2012-07-04|     3|  1|   7|  1|      1|      3|         0|         1|0.68|0.6364|0.74|        0|    27|        96|123|
|   4953|2011-07-30|     3|  0|   7|  6|      0|      6|         0|         1|0.72|0.6818| 0.7|   0.2985|     6|        18| 24|
|   4666|2011-07-18|     3|  0|   7|  7|      0|      1|         1|         1|0.68|0.6364|0.74|   0.2239|    22|       255|277|
|   4282|2011-07-02|     3|  0|   7|  7|      0|      6|         0|         1|0.64|0.6061|0.65|        0|    10|        35| 45|
|   4319|2011-07-03|     3|  0|   7| 20|      0|      0|         0|         3|0.66|0.6061|0.83|        0|    83|        93|176|
+-------+----------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
only showing top 20 rows


In [ ]:
# what else would you examine here?
# more functions can be found here in documentation (listed in refs)

In [18]:
# when we are done working with data, remove from memory
july_trips.unpersist()


Out[18]:
DataFrame[instant: string, dteday: string, season: string, yr: string, mnth: string, hr: string, holiday: string, weekday: string, workingday: string, weathersit: string, temp: string, atemp: string, hum: string, windspeed: string, casual: string, registered: string, cnt: string]

Markdown is useful for analysis notes, directions, and making jokes...

You can also reference songs you like, which are more fun for WordCount() than README.md


In [19]:
# create an RDD from music lyrics + perform Classic WordCount()
from operator import add

lines = sc.textFile(WORK_DIR + "/data/music/machete - amanda palmer")
counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)
output = counts.collect()
for (word, count) in output:
    print "%s: %i" % (word, count)


: 101
blind: 7
liked: 8
love: 8
just: 3
from: 1
doing: 1
had: 1
box: 8
dead: 1
past: 1
nothing: 2
see: 3
through: 2
have: 8
need: 4
my: 1
guess: 1
kind: 7
alive: 1
what: 4
paradox: 7
symbols: 1
head: 1
oceanside: 1
won’t: 1
seems: 3
chest: 1
here: 4
robbed: 7
behind: 1
i’ll: 1
wrapped: 1
knives: 8
you: 53
feather: 1
was: 15
?: 1
nothing’s: 2
withstood: 7
a: 18
them: 3
stood: 1
hacked: 1
beauty: 1
i’m: 3
collapsed: 1
magic: 1
little: 1
but: 9
me: 8
didn’t: 4
not: 2
off: 5
machete: 5
with: 3
learned: 7
died: 2
case: 1
sliced: 2
and: 25
can’t: 1
kept: 1
keeping: 1
i: 29
where: 1
dumbo’s: 1
as: 2
vines: 1
will: 1
this: 3
so: 10
free: 1
drill: 1
anti: 1
the: 18
soft: 1
couldn’t: 1
now: 1
because: 7
up: 3
hard: 1
it: 6
felt: 1
you’re: 7
surrounding: 1
woods: 1
carved: 1
in: 3
piling: 1
heading: 1
your: 7
out: 3
or: 1
beatings: 1
said: 13
died.: 1
to: 20
things: 1
tried: 1
it’s: 6
arms: 1
who: 1
fight: 7
how: 5
that: 16
parts: 1
don’t: 7
laughed: 1
many: 1
really: 2
worth: 2
boo: 1
do: 6
never: 10
around: 1
get: 6
going: 3
believe: 2
safe: 7
yesterday: 3
took: 6
cut: 4
cutting: 1
if: 4
blood: 1
dock: 1
seriously,: 1
there’s: 1
path: 1
stuff?: 3
day: 1
on: 2
no: 1
like: 4
covering: 1
of: 18
childhood: 7
drop: 1
terrible: 1
keep: 2
work: 2
matter: 2
cried:: 1
were: 1
making: 1
you’ve: 1
called: 1
know: 7